【RabbitMQ】 您所在的位置:网站首页 rabbitmq 一个queue多个消费 【RabbitMQ】

【RabbitMQ】

2023-06-16 12:32| 来源: 网络整理| 查看: 265

目录 第二章 简单模式2.1 准备工作2.2. 添加依赖2.3. 消息生产者2.4. 消息消费者

第二章 简单模式

在这一部分中,我们将用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印 出来的消费者。我们将介绍 Java API 中的一些细节。

在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代 表使用者保留的消息缓冲区

在这里插入图片描述

2.1 准备工作

创建一个空项目 在这里插入图片描述 再创建一个maven模块 在这里插入图片描述

修改下项目和模块的语言级别 在这里插入图片描述

在这里插入图片描述

2.2. 添加依赖 org.apache.maven.plugins maven-compiler-plugin 8 8 com.rabbitmq amqp-client 5.8.0 commons-io commons-io 2.7 2.3. 消息生产者 public class Producer { // 队列名称 public static final String Queue_NAME = "hello"; //发消息 public static void main(String[] args) throws IOException, TimeoutException { // 创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置工厂IP:连接RabbitMQ的队列 factory.setHost("192.168.174.101"); // 用户名 factory.setUsername("admin"); // 密码 factory.setPassword("123"); // 创建连接 Connection connection = factory.newConnection(); // 获取信道 Channel channel = connection.createChannel(); /* 根据图示RabbitMQ的工作过程,我们应该通过连接交换机去连接队列,但我们这个简单的程序就先直接连接队列,跳过交换机 */ // 创建一个队列 /** * queueDeclare的参数: * 1.队列名称 * 2.队列里面的消息是否持久化(存储在磁盘中),默认情况消息存储在内存中 * 3.该队列是否只供一个消费者进行消费、是否进行消息共享,true:只能一个消费着消费 ,false:可以多个消费者消费 * 4.是否自动删除,最后一个消费者断开连接以后,该队列是否自动删除,true:自动删除,false:不自动删除 * 5.其他参数(暂时写为null) */ channel.queueDeclare(Queue_NAME,false,false,false,null); // 发消息 String message = "hello,world!"; // 初次使用 /** * 发送一个消息 * basicPublish的参数: * 1.发送到哪个交换机(本程序没有使用交换机,所以为空) * 2.路由的key值是哪个(本次是队列名称) * 3.其他参数信息(本次为null) * 4.发送的消息(不能直接发送消息,而是要发送对应的消息的二进制) */ channel.basicPublish("",Queue_NAME,null,message.getBytes()); System.out.println("消息发送完毕"); } }

运行成功后 在这里插入图片描述

去网页端查看:有个hello的队列 在这里插入图片描述

点击进入: 在这里插入图片描述

注意:发送消息程序会一直保持运行,如果断开程序,那么消息队列中的消息也会消失 在这里插入图片描述

2.4. 消息消费者 public class Consumer { // 队列名称 public static final String QUEUE_NAME = "hello"; // 要与生产者发送消息的队列名称保持一致 // 接受消息 public static void main(String[] args) throws Exception{ // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.174.101"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection = factory.newConnection(); // 由连接创建一个信道 Channel channel = connection.createChannel(); // 回调是一个接口,需要实现接口(或者匿名内部类、lambda表达式的写法都可) // 声明 接收消息 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody())); }; // 取消消费时的回调函数 CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断"); }; /** * 消费者接受消息 * basicConsume方法参数: * 1.消费哪个队列 * 2.消费成功之后是否要自动应答,true:表示自动应答,false:手动应答 * 3.消费者未成功消费的回调(因为不是所有的情况下都能消费消息或者说接受消息) * 4.消费者取消消费的回调 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }

运行消费者,接收到消息 在这里插入图片描述



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有